« 一个格式化文本信息版面的小玩意 | 返回首页 | 关于虚拟文件系统的一些新想法 »

为 log 实现的无锁 Ringbuffer

这两天在改 log 模块。我们需要一个并发写 log 的模块,它有多个 log 生产者一个消费者,这个唯一的消费者在 log 线程中把 log 数据持久化。

大多数 log 生产者是在第三方库的 callback 函数中调用的,比如 bgfx ,如果写 log 不够快的话,就会阻塞渲染。这个 callback 需要自己保证线程安全。因为 bgfx 支持多线程渲染,所以写 log 的 callback 可能在不同的线程触发。

过去在实现 bgfx 的 luabinding 时,我实现了一个简单的 mpsc 队列,get_log 这个函数就是那个单一消费者,它取出队列中所有的 log 信息,返回到 lua 虚拟机中。

它是用 spin_lock 实现的。这两天,我想应该可以实现一个更通用的无锁版本。

在我的需求中,log 信息是允许丢掉的。所以我开了一个固定大小的 ringbuffer 收集各个不同线程生产出来的 log ,然后在一个单一线程定期(通常是一个渲染帧一次)取出它们。只要取的频率够高,而生产的 log 数量不那么快的话,一个合适大小的 ringbuffer 就能以最简单的数据结构解决问题。

我觉得一个无锁结构的 log 系统需要两个 ringbuffer 。

我们缓存的 log 条目数目上限估计不用太大,4096 或许是个合适的数字:即,每帧不会产生超过 4000 条 log 。那么就用一个 4096 的固定数组即可。

实现这么一个 ringbuffer 需要有两个 64bit 变量,head 和 tail 。其中 tail 被多个生产者共享,所以它必须是原子变量,让多个生产者依次尾进头出这个队列 ring buffer。head 只由唯一消费者控制,不需要原子变量。写入数据保持这样的流程:

  1. index = fetch and add tail, 1
  2. buffer[index % 4096] = meta

这里只需要记录 meta 信息,而不是 log 的文本。这里的 meta 信息只这一条 log 的实际内容在另一个 ringbuffer 中的 offset 和 size 。写入 meta 信息时,需要先写 offset 再写 size。为什么是这个次序,下面会展开说。

第二个 ringbuffer 记录 log 的文本内容,可以用一个更大的队列,比如 64K 。这个 ringbuffer 只需要一个 64bit 的原子变量 ptr 。而将 log 文本写入 buffer 只需要下列的流程:

  1. offset = fetch and add ptr, size
  2. copy string to buffer + offset % 64K (回绕时,需要分两段复制)

也就是说,我们把 log 文本写入一个固定长度的 ringbuffer 时,只要不断的推进 ptr 指针,然后写入数据即可,不用考虑是否覆盖了旧数据。

而 log 的消费者负责检查数据是否还在 ringbuffer 中,或是已经被覆盖丢失。这个检查条件非常简单: offset + 64K 小于 ptr 表示该 offset 处的内容已经不在内存中。因为持有引用方记住的 offset 和 ringbuffer 自己的 ptr 都是 64bit 单调递增的,而内存中只保存有 ptr 之前 64k 的内容,比较它们两个值就能知道数据是否有效。

在第一个 ringbuffer 每个条目的 meta 信息中,我们保存有数据在第二个 buffer 中的 offset 和 size 。读取后便可以校验读到的数据是否有效。

唯一一个读取 log 的消费者可遵循这样的流程:

  1. 如果在第一个 ringbuffer 中, head == tail 表示队列为空。
  2. 如果 head 对应的 meta.size 为负数, 表示数据还没有准备好(也可以视为空)。
  3. 队列有效时,index = head++ 。递增 head 。
  4. 根据 buffer[index] 的 offset 和 size 从第二个 ringbuffer 中取出内容。
  5. buffer[index].size = -1 。赋值 size 为 -1 。这样可以标识这个 slot 是无效的。这可以保证在生产者填入数据时(最后写 size 字段),如果没填完,以上 step 2 就能检查出来。

我简单实现了一下:

https://gist.github.com/cloudwu/e8cc734a31dd01b439d8d131acc361c3

尚未测试。而且就我写并发代码,尤其是无锁结构,是很容易出错的。所以以上代码仅供参考。它的确很简单,如果有 bug 也应该很快能发现。

Comments

如果是单一消费者的话,实际上有更简便的办法的: produce: // 常规push操作,比如用__sync_val_compare_and_swap插入到表头 consume: // 实际上我们不需要实现pop操作,而是用__sync_lock_test_and_set直接将producer线程引用的表头指针steal过来,也就是说把整个链表steal过来,留一个空的表头指针给原来的producr线程 参考:"Figure 2 Public Free List" of "Richard Hudson, Bratin Saha, Ali-Reza Adl-Tabatabai, Benjamin C. Hertzberg. "McRT-Malloc: a scalable transactional memory allocator". ISMM 2006."
大有收获
https://github.com/zwzw1/NanoLog
@Cloud 不好意思,解决了,是程序本身问题。昨天发了评论修正但是今天发现评论失败了 这个tid应该是有 0 1 2 3,只有2 说明是第二个线程的内容重复写了。 [4] Hello World tid=2 1 [5] Hello World tid=2 1 [6] Hello World tid=2 1 [7] Hello World tid=2 1 问题是tmp变量没有设置为私有导致的。 加上private(tmp)可解决。 #pragma omp parallel num_threads(4) private(thread_id) private(tmp) 谢谢
@Joe output 哪里有问题? 我看到 Hello World 都是对的,最后的 i 也是 4 个一组递增的。
用openmp模型的多线程测试了一下,结果不正确。看不出问题出在哪里 output: [0] Hello World tid=0 0 [1] Hello World tid=3 0 [2] Hello World tid=3 0 [3] Hello World tid=2 0 [4] Hello World tid=2 1 [5] Hello World tid=2 1 [6] Hello World tid=2 1 [7] Hello World tid=2 1 [8] Hello World tid=3 2 [9] Hello World tid=1 2 [10] Hello World tid=1 2 [11] Hello World tid=1 2 [12] Hello World tid=3 3 [13] Hello World tid=3 3 [14] Hello World tid=3 3 [15] Hello World tid=0 3 [16] Hello World tid=0 4 [17] Hello World tid=0 4 [18] Hello World tid=0 4 [19] Hello World tid=0 4 [20] Hello World tid=2 5 [21] Hello World tid=2 5 [22] Hello World tid=1 5 [23] Hello World tid=1 5 [24] Hello World tid=0 6 [25] Hello World tid=0 6 [26] Hello World tid=0 6 [27] Hello World tid=0 6 [28] Hello World tid=3 7 [29] Hello World tid=2 7 [30] Hello World tid=2 7 [31] Hello World tid=2 7 [32] Hello World tid=1 8 [33] Hello World tid=2 8 [34] Hello World tid=2 8 [35] Hello World tid=2 8 [36] Hello World tid=1 9 [37] Hello World tid=2 9 [38] Hello World tid=2 9 [39] Hello World tid=2 9 #include "clog.h" #include #include int main() { struct log_buffer *log = log_new(); char tmp[100]; int i; int thread_id; for (i=0; i int main() { struct log_buffer *log = log_new(); char tmp[100]; int i; int thread_id; for (i=0; i< 10; i++) { #pragma omp parallel num_threads(4) private(thread_id) { thread_id = omp_get_thread_num(); //printf("thread_id = %d\n", thread_id); int n = sprintf(tmp, "Hello World tid=%d %d", thread_id, i); log_push(log, n, tmp); } } printf("push\n"); for (i = 0; i < 40; i++) { int n = log_pop(log, 100, tmp); if (n <= 0) { printf("Drop %d\n", i); } else { printf("[%d] %.*s\n", i, n, tmp); } } log_delete(log); return 0; }
如果是单一消费者的话,实际上有更简便的办法的: produce: // 常规push操作,比如用__sync_val_compare_and_swap插入到表头 consume: // 实际上我们不需要实现pop操作,而是用__sync_lock_test_and_set直接将producer线程引用的表头指针steal过来,也就是说把整个链表steal过来,留一个空的表头指针给原来的producr线程 参考:"Figure 2 Public Free List" of "Richard Hudson, Bratin Saha, Ali-Reza Adl-Tabatabai, Benjamin C. Hertzberg. "McRT-Malloc: a scalable transactional memory allocator". ISMM 2006."

Post a comment

非这个主题相关的留言请到:留言本